跳到主要内容

Go 整合 Zookeeper 做服务发现

大部分内容转载自 手把手教你使用Go基于zookeeper编写服务发现

使用 Go 语言实现一下服务发现的注册和发现功能。

如图所示,我们要提供 api.user 这样的服务,这个服务有3个节点,每个节点有不一样的服务地址,这 3 个节点各自将自己的服务注册进 zk,然后消费者进行读取 zk 得到 api.user 的服务地址,任选一个节点地址进行服务调用。为了简单化,这里就没有提供权重参数了。在一个正式的服务发现里一般都有权重参数,用于调整服务节点之间的流量分配。

引入依赖,这里使用 samuel/go-zookeeper,虽然这个库已经不再维护了,但是这里只是学习 Zookeeper 的使用,所以无所谓了

go get -u github.com/samuel/go-zookeeper/zk

首先我们定义一个 ServiceNode 结构,这个结构数据会存储在节点的 data 中,表示服务发现的地址信息。

type ServiceNode struct {
Name string `json:"name"` // 服务名称,这里是user
Host string `json:"host"`
Port int `json:"port"`
}

再定义一个服务发现的客户端结构体 SdClient。

type SdClient struct {
zkServers []string // 多个节点地址
zkRoot string // 服务根节点,这里是 /api
conn *zk.Conn // zk的客户端连接
}

编写构造器,创建根节点

func NewClient(zkServers []string, zkRoot string, timeout int) (*SdClient, error) {
client := new(SdClient)
client.zkServers = zkServers
client.zkRoot = zkRoot

// 连接服务器,设置连接的服务器地址和超时时间
conn, _, err := zk.Connect(zkServers, time.Duration(timeout)*time.Second)
if err != nil {
return nil, err
}

client.conn = conn
// 创建服务根节点
if err := client.ensureRoot(); err != nil {
client.Close()
return nil, err
}

return client, nil

}

// 关闭连接,释放临时节点
func (s *SdClient) Close() {
s.conn.Close()
}

// 创建根节点
func (s *SdClient) ensureRoot() error {
exists, _, err := s.conn.Exists(s.zkRoot)
if err != nil {
return err
}

if !exists {
// 这个 ACL 是 Access Control List,访问控制表,这里 WorldACL 和 PermAll 表示所有人都拥有全部权限
_, err := s.conn.Create(s.zkRoot, []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return err
}
}

return nil
}

值得注意的是代码中的 Create 调用可能会返回节点已存在错误,这是正常现象,因为会存在多进程同时创建节点的可能。如果创建根节点出错,还需要及时关闭连接。

我们不关心节点的权限控制,所以使用 zk.WorldACL(zk.PermAll) 表示该节点没有权限限制。Create 参数中的 flag = 0 表示这是一个持久化的普通节点。

接下来我们编写服务注册方法

代码如下:

// 服务注册
func (s *SdClient) Register(node *ServiceNode) error {
if err := s.ensureName(node.Name); err != nil {
return err
}

// 注意,这里创建的 path 与上面的 ensureName 创建的节点名称不一样(多个了 /n)
path := s.zkRoot + "/" + node.Name + "/n"
data, err := json.Marshal(node)
if err != nil {
return err
}
// 创建一个保护顺序临时 ProtectedEphemeralSequential 子节点,同时将地址信息存储在节点中。
_, err = s.conn.CreateProtectedEphemeralSequential(path, data, zk.WorldACL(zk.PermAll))
if err != nil {
return err
}
return nil
}

func (s *SdClient) ensureName(name string) error {
path := s.zkRoot + "/" + name
exists, _, err := s.conn.Exists(path)
if err != nil {
return err
}
if !exists {
_, err := s.conn.Create(path, []byte(""), 0, zk.WorldACL(zk.PermAll))
if err != nil && err != zk.ErrNodeExists {
return err
}
}
return nil
}

先要创建 /api/user 节点作为服务列表的父节点(ensureName 方法)。

然后再创建一个保护顺序临时 ProtectedEphemeralSequential 子节点,同时将地址信息存储在节点中。就是利用这种临时节点的特点来注册服务。

什么叫保护顺序临时节点,首先它是一个临时节点,会话关闭后节点自动消失。其它它是个顺序节点,zookeeper 自动在名称后面增加自增后缀,确保节点名称的唯一性。同时还是个保护性节点,节点前缀增加了 GUID 字段,确保断开重连后临时节点可以和客户端状态对接上。

注意看注册 ProtectedEphemeralSequential 节点的路径后面跟了个 /n

接下来我们实现消费者获取服务列表方法

func (s *SdClient) GetNodes(name string) ([]*ServiceNode, error) {
path := s.zkRoot + "/" + name
// 获取子节点名称
childs, _, err := s.conn.Children(path)
if err != nil {
if err == zk.ErrNoNode {
return []*ServiceNode{}, nil
}
return nil, err
}

nodes := []*ServiceNode{}

for _, child := range childs {
fullPath := path + "/" + child
data, _, err := s.conn.Get(fullPath)
if err != nil {
if err == zk.ErrNoNode {
continue
}
return nil, err
}

node := new(ServiceNode)
err = json.Unmarshal(data, node)
if err != nil {
return nil, err
}
nodes = append(nodes, node)
}

return nodes, nil
}

获取服务节点列表时,我们先获取子节点的名称列表,然后依次读取内容拿到服务地址。因为获取子节点名称和获取子节点内容不是一个原子操作,所以在调用 Get 获取内容时可能会出现节点不存在错误,这是正常现象。

将以上代码凑在一起,一个简单的服务发现包装就实现了。

最后我们看看如果使用以上代码,为了方便起见,我们将多个服务提供者和消费者写在一个main方法里。

func main() {
// ZK 服务器地址列表
servers := []string{"192.168.0.101:2118", "192.168.0.102:2118", "192.168.0.103:2118"}
client, err := NewClient(servers, "/api", 10)
if err != nil {
panic(err)
}

defer client.Close()
node1 := &ServiceNode{"user", "127.0.0.1", 4000}
node2 := &ServiceNode{"user", "127.0.0.1", 4001}
node3 := &ServiceNode{"user", "127.0.0.1", 4002}

if err := client.Register(node1); err != nil {
panic(err)
}

if err := client.Register(node2); err != nil {
panic(err)
}

if err := client.Register(node3); err != nil {
panic(err)
}
nodes, err := client.GetNodes("user")

if err != nil {
panic(err)
}

for _, node := range nodes {
fmt.Println(node.Host, node.Port)
}
}

值得注意的是使用时一定要在进程退出前调用 Close 方法,否则 zookeeper 的会话不会立即关闭,服务器创建的临时节点也就不会立即消失,而是要等到 timeout 之后服务器才会清理。